// Copyright Epic Games, Inc. All Rights Reserved.

#include "filecas.h"

#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zencore/uid.h>
#include <zencore/workthreadpool.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/basicfile.h>

#if ZEN_WITH_TESTS
#	include <zencore/compactbinarybuilder.h>
#endif

#include <gsl/gsl-lite.hpp>

#include <barrier>
#include <filesystem>
#include <functional>
#include <unordered_map>

ZEN_THIRD_PARTY_INCLUDES_START
#include <xxhash.h>
#if ZEN_PLATFORM_WINDOWS
#	include <zencore/windows.h>
#else
#	include <fcntl.h>
#	include <sys/stat.h>
#	include <unistd.h>
#endif
ZEN_THIRD_PARTY_INCLUDES_END

namespace zen {

namespace {
	template<typename T>
	void Reset(T& V)
	{
		T Tmp;
		V.swap(Tmp);
	}
}  // namespace

namespace filecas::impl {
	const char* IndexExtension = ".uidx";
	const char* LogExtension   = ".ulog";

	std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); }

	std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir)
	{
		return RootDir / fmt::format("cas.tmp{}", IndexExtension);
	}

	std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); }

#pragma pack(push)
#pragma pack(1)

	struct FileCasIndexHeader
	{
		static constexpr uint32_t ExpectedMagic	 = 0x75696478;	// 'uidx';
		static constexpr uint32_t CurrentVersion = 1;

		uint32_t Magic		 = ExpectedMagic;
		uint32_t Version	 = CurrentVersion;
		uint64_t EntryCount	 = 0;
		uint64_t LogPosition = 0;
		uint32_t Reserved	 = 0;
		uint32_t Checksum	 = 0;

		static uint32_t ComputeChecksum(const FileCasIndexHeader& Header)
		{
			return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
		}
	};

	static_assert(sizeof(FileCasIndexHeader) == 32);

#pragma pack(pop)

}  // namespace filecas::impl

FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash)
{
	ShardedPath.Append(RootPath.c_str());

	ExtendableStringBuilder<64> HashString;
	ChunkHash.ToHexString(HashString);

	const char* str = HashString.c_str();

	// Shard into a path with two directory levels containing 12 bits and 8 bits
	// respectively.
	//
	// This results in a maximum of 4096 * 256 directories
	//
	// The numbers have been chosen somewhat arbitrarily but are large to scale
	// to very large chunk repositories without creating too many directories
	// on a single level since NTFS does not deal very well with this.
	//
	// It may or may not make sense to make this a configurable policy, and it
	// would probably be a good idea to measure performance for different
	// policies and chunk counts

	ShardedPath.AppendSeparator();
	ShardedPath.AppendAsciiRange(str, str + 3);

	ShardedPath.AppendSeparator();
	ShardedPath.AppendAsciiRange(str + 3, str + 5);
	Shard2len = ShardedPath.Size();

	ShardedPath.AppendSeparator();
	ShardedPath.AppendAsciiRange(str + 5, str + 40);
}

//////////////////////////////////////////////////////////////////////////

static const float IndexMinLoadFactor = 0.2f;
static const float IndexMaxLoadFactor = 0.7f;

FileCasStrategy::FileCasStrategy(GcManager& Gc) : m_Log(logging::Get("filecas")), m_Gc(Gc)
{
	m_Index.min_load_factor(IndexMinLoadFactor);
	m_Index.max_load_factor(IndexMaxLoadFactor);

	m_Gc.AddGcStorage(this);
	m_Gc.AddGcReferenceStore(*this);
}

FileCasStrategy::~FileCasStrategy()
{
	m_Gc.RemoveGcReferenceStore(*this);
	m_Gc.RemoveGcStorage(this);
}

void
FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore)
{
	ZEN_TRACE_CPU("FileCas::Initialize");

	using namespace filecas::impl;

	m_IsInitialized = true;

	m_RootDirectory = RootDirectory;

	m_Index.clear();

	std::filesystem::path LogPath	= GetLogPath(m_RootDirectory);
	std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);

	if (IsNewStore)
	{
		std::filesystem::remove(LogPath);
		std::filesystem::remove(IndexPath);

		if (std::filesystem::is_directory(m_RootDirectory))
		{
			// We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside
			// in this folder as well
			struct Visitor : public FileSystemTraversal::TreeVisitor
			{
				virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override
				{
					// We don't care about files
				}
				static bool IsHexChar(std::filesystem::path::value_type C)
				{
					return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16];
				}
				virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
											[[maybe_unused]] const path_view&			  DirectoryName) override
				{
					if (DirectoryName.length() == 3)
					{
						if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2]))
						{
							ShardedRoots.push_back(Parent / DirectoryName);
						}
					}
					return false;
				}
				std::vector<std::filesystem::path> ShardedRoots;
			} CasVisitor;

			FileSystemTraversal Traversal;
			Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor);
			for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots)
			{
				std::filesystem::remove_all(SharededRoot);
			}
		}
	}

	if (std::filesystem::is_regular_file(IndexPath))
	{
		uint32_t IndexVersion = 0;
		m_LogFlushPosition	  = ReadIndexFile(IndexPath, IndexVersion);
		if (IndexVersion == 0)
		{
			ZEN_WARN("removing invalid index file at '{}'", IndexPath);
			std::filesystem::remove(IndexPath);
		}
	}

	uint64_t LogEntryCount = 0;
	if (std::filesystem::is_regular_file(LogPath))
	{
		if (TCasLogFile<FileCasIndexEntry>::IsValid(LogPath))
		{
			LogEntryCount = ReadLog(LogPath, m_LogFlushPosition);
		}
		else
		{
			ZEN_WARN("removing invalid cas log at '{}'", LogPath);
			std::filesystem::remove(LogPath);
		}
	}

	for (const auto& Entry : m_Index)
	{
		m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed);
	}

	CreateDirectories(m_RootDirectory);
	m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);

	if (IsNewStore || LogEntryCount > 0)
	{
		MakeIndexSnapshot();
	}
}

CasStore::InsertResult
FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode)
{
	ZEN_TRACE_CPU("FileCas::InsertChunk");

	ZEN_ASSERT(m_IsInitialized);

#if !ZEN_WITH_TESTS
	ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
#endif

	if (Mode == CasStore::InsertMode::kCopyOnly)
	{
		{
			RwLock::SharedLockScope _(m_Lock);
			if (m_Index.contains(ChunkHash))
			{
				return CasStore::InsertResult{.New = false};
			}
		}
		return InsertChunkData(Chunk.Data(), Chunk.Size(), ChunkHash);
	}

	// File-based chunks have special case handling whereby we move the file into
	// place in the file store directory, thus avoiding unnecessary copying

	IoBufferFileReference FileRef;
	bool				  IsWholeFile = Chunk.IsWholeFile();
	if (IsWholeFile && Chunk.GetFileReference(/* out */ FileRef))
	{
		ZEN_TRACE_CPU("FileCas::InsertChunk::Move");

		{
			bool Exists = true;
			{
				RwLock::SharedLockScope _(m_Lock);
				Exists = m_Index.contains(ChunkHash);
			}
			if (Exists)
			{
				return CasStore::InsertResult{.New = false};
			}
		}

		ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);

		RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));

#if ZEN_PLATFORM_WINDOWS
		const HANDLE ChunkFileHandle = FileRef.FileHandle;
		// See if file already exists
		{
			ZEN_TRACE_CPU("FileCas::InsertChunk::Exists");

			windows::FileHandle PayloadFile;

			if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes))
			{
				// If we succeeded in opening the target file then we don't need to do anything else because it already exists
				// and should contain the content we were about to insert

				// We do need to ensure the source file goes away on close, however
				size_t	 ChunkSize = Chunk.GetSize();
				uint64_t FileSize  = 0;
				if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
				{
					HashLock.ReleaseNow();

					bool IsNew = false;
					{
						RwLock::ExclusiveLockScope __(m_Lock);
						IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
					}
					if (IsNew)
					{
						m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
					}

					return CasStore::InsertResult{.New = IsNew};
				}
				else
				{
					ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
							 Name.ShardedPath.ToUtf8(),
							 ChunkSize,
							 FileSize);
				}
			}
			else
			{
				if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
				{
					// Shard directory does not exist
				}
				else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND))
				{
					// Shard directory exists, but not the file
				}
				else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION))
				{
					// Sharing violation, likely because we are trying to open a file
					// which has been renamed on another thread, and the file handle
					// used to rename it is still open. We handle this case below
					// instead of here
				}
				else
				{
					ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes);
				}
			}
		}

		std::filesystem::path FullPath(Name.ShardedPath.c_str());

		std::filesystem::path FilePath = FullPath.parent_path();
		std::wstring		  FileName = FullPath.native();

		const DWORD		  BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR));
		FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize));
		memset(RenameInfo, 0, BufferSize);

		RenameInfo->ReplaceIfExists = FALSE;
		RenameInfo->FileNameLength	= gsl::narrow<DWORD>(FileName.size());
		memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR));
		RenameInfo->FileName[FileName.size()] = 0;

		auto $ = MakeGuard([&] { Memory::Free(RenameInfo); });

		// Try to move file into place
		BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);

		if (!Success)
		{
			// The rename/move could fail because the target directory does not yet exist. This code attempts
			// to create it

			windows::FileHandle DirHandle;

			auto InternalCreateDirectoryHandle = [&] {
				return DirHandle.Create(FilePath.c_str(),
										GENERIC_READ | GENERIC_WRITE,
										FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
										OPEN_EXISTING,
										FILE_FLAG_BACKUP_SEMANTICS);
			};

			// It's possible for several threads to enter this logic trying to create the same
			// directory. Only one will create the directory of course, but all threads will
			// make it through okay

			HRESULT hRes = InternalCreateDirectoryHandle();

			if (FAILED(hRes))
			{
				// TODO: we can handle directory creation more intelligently and efficiently than
				// this currently does

				CreateDirectories(FilePath.c_str());

				hRes = InternalCreateDirectoryHandle();
			}

			if (FAILED(hRes))
			{
				ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath));
			}

			// Retry rename/move

			Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
		}

		if (Success)
		{
			m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
			Chunk.SetDeleteOnClose(false);

			HashLock.ReleaseNow();

			bool IsNew = false;
			{
				RwLock::ExclusiveLockScope __(m_Lock);
				IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
			}
			if (IsNew)
			{
				m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
			}
			return CasStore::InsertResult{.New = IsNew};
		}

		const DWORD LastError = GetLastError();

		if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS))
		{
			HashLock.ReleaseNow();

			bool IsNew = false;
			{
				RwLock::ExclusiveLockScope __(m_Lock);
				IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
			}
			if (IsNew)
			{
				m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
			}

			return CasStore::InsertResult{.New = IsNew};
		}
		ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
				 GetSystemErrorAsString(LastError),
				 ChunkHash);

#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC

		std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle);
		std::filesystem::path DestPath	 = Name.ShardedPath.c_str();
		int					  Ret		 = link(SourcePath.c_str(), DestPath.c_str());
		if (Ret < 0 && zen::GetLastError() == ENOENT)
		{
			// Destination directory doesn't exist. Create it any try again.
			CreateDirectories(DestPath.parent_path().c_str());
			Ret = link(SourcePath.c_str(), DestPath.c_str());
		}
		if (Ret == 0)
		{
			m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
			Chunk.SetDeleteOnClose(false);

			HashLock.ReleaseNow();
			bool IsNew = false;
			{
				RwLock::ExclusiveLockScope __(m_Lock);
				IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
			}
			if (IsNew)
			{
				m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
			}
			return CasStore::InsertResult{.New = IsNew};
		}
		else
		{
			int LinkError = zen::GetLastError();

			// It is possible that someone beat us to it in linking the file. In that
			// case a "file exists" error is okay. All others are not.
			if (LinkError == EEXIST)
			{
				HashLock.ReleaseNow();
				bool IsNew = false;
				{
					RwLock::ExclusiveLockScope __(m_Lock);
					IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
				}
				if (IsNew)
				{
					m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
				}
				return CasStore::InsertResult{.New = IsNew};
			}

			ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
					 GetSystemErrorAsString(LinkError),
					 ChunkHash);
		}
#endif	// ZEN_PLATFORM_*
	}

	return InsertChunkData(Chunk.Data(), Chunk.Size(), ChunkHash);
}

CasStore::InsertResult
FileCasStrategy::InsertChunkData(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
{
	ZEN_TRACE_CPU("FileCas::InsertChunkData");

	ZEN_ASSERT(m_IsInitialized);

	{
		RwLock::SharedLockScope _(m_Lock);
		if (m_Index.contains(ChunkHash))
		{
			return {.New = false};
		}
	}

	ZEN_TRACE_CPU("FileCas::InsertChunkData::Write");

	ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);

	// See if file already exists

#if ZEN_PLATFORM_WINDOWS
	windows::FileHandle PayloadFile;

	HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);

	if (SUCCEEDED(hRes))
	{
		// If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the
		// content we were about to insert

		bool IsNew = false;
		{
			RwLock::ExclusiveLockScope _(m_Lock);
			IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
		}
		if (IsNew)
		{
			m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
		}
		return CasStore::InsertResult{.New = IsNew};
	}

	PayloadFile.Close();
#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
	if (access(Name.ShardedPath.c_str(), F_OK) == 0)
	{
		return CasStore::InsertResult{.New = false};
	}
#endif

	RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));

#if ZEN_PLATFORM_WINDOWS
	// For now, use double-checked locking to see if someone else was first

	hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);

	if (SUCCEEDED(hRes))
	{
		uint64_t FileSize = 0;
		if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
		{
			// If we succeeded in opening the file then and the size is correct we don't need to do anything
			// else because someone else managed to create the file before we did. Just return.

			HashLock.ReleaseNow();
			bool IsNew = false;
			{
				RwLock::ExclusiveLockScope __(m_Lock);
				IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
			}
			if (IsNew)
			{
				m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
			}
			return CasStore::InsertResult{.New = IsNew};
		}
		else
		{
			ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
					 Name.ShardedPath.ToUtf8(),
					 ChunkSize,
					 FileSize);
		}
	}

	if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)))
	{
		ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes));
	}

	auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };

	hRes = InternalCreateFile();

	if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
	{
		// Ensure parent directories exist and retry file creation
		CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len));
		hRes = InternalCreateFile();
	}

	if (FAILED(hRes))
	{
		ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8()));
	}
#else
	// Attempt to exclusively create the file.
	auto InternalCreateFile = [&] {
		int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666);
		if (Fd >= 0)
		{
			fchmod(Fd, 0666);
		}
		return Fd;
	};
	int Fd = InternalCreateFile();
	if (Fd < 0)
	{
		switch (zen::GetLastError())
		{
			case EEXIST:
				// Another thread has beat us to it so we're golden.
				{
					HashLock.ReleaseNow();

					bool IsNew = false;
					{
						RwLock::ExclusiveLockScope __(m_Lock);
						IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
					}
					if (IsNew)
					{
						m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
					}
					return {.New = IsNew};
				}
				break;

			case ENOENT:
				if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len)))
				{
					Fd = InternalCreateFile();
					if (Fd >= 0)
					{
						break;
					}
				}
				ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath));

			default:
				ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8()));
		}
	}

	struct FdWrapper
	{
		~FdWrapper() { Close(); }
		void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); }
		void Close()
		{
			if (Fd >= 0)
			{
				close(Fd);
				Fd = -1;
			}
		}
		int Fd;
	} PayloadFile = {Fd};
#endif	// ZEN_PLATFORM_WINDOWS

	size_t ChunkRemain = ChunkSize;
	auto   ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData);

	while (ChunkRemain != 0)
	{
		uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain));

		PayloadFile.Write(ChunkCursor, ByteCount);

		ChunkCursor += ByteCount;
		ChunkRemain -= ByteCount;
	}

	// We cannot rely on RAII to close the file handle since it would be closed
	// *after* the lock is released due to the initialization order
	PayloadFile.Close();

	m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});

	HashLock.ReleaseNow();

	bool IsNew = false;
	{
		RwLock::ExclusiveLockScope __(m_Lock);
		IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
	}
	if (IsNew)
	{
		m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
	}

	return {.New = IsNew};
}

IoBuffer
FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
	ZEN_TRACE_CPU("FileCas::FindChunk");

	ZEN_ASSERT(m_IsInitialized);

	uint64_t ExpectedSize = 0;
	{
		RwLock::SharedLockScope _(m_Lock);
		if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
		{
			ExpectedSize = It->second.Size;
		}
		else
		{
			return {};
		}
	}

	ShardingHelper			Name(m_RootDirectory.c_str(), ChunkHash);
	RwLock::SharedLockScope _(LockForHash(ChunkHash));

	if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); Chunk.GetSize() == ExpectedSize)
	{
		return Chunk;
	}

	return {};
}

bool
FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
	ZEN_ASSERT(m_IsInitialized);

	RwLock::SharedLockScope _(m_Lock);
	return m_Index.contains(ChunkHash);
}

void
FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
{
	ZEN_TRACE_CPU("FileCas::DeleteChunk");

	ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);

	uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec));
	if (Ec)
	{
		ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8());
		FileSize = 0;
	}

	ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize));
	std::filesystem::remove(Name.ShardedPath.c_str(), Ec);

	if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str()))
	{
		{
			RwLock::ExclusiveLockScope _(m_Lock);
			if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
			{
				m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed);
				m_Index.erase(It);
			}
		}
		m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize});
	}
}

void
FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
{
	ZEN_ASSERT(m_IsInitialized);

	// NOTE: it's not a problem now, but in the future if a GC should happen while this
	// is in flight, the result could be wrong since chunks could go away in the meantime.
	//
	// It would be good to have a pinning mechanism to make this less likely but
	// given that chunks could go away at any point after the results are returned to
	// a caller, this is something which needs to be taken into account by anyone consuming
	// this functionality in any case

	InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}

bool
FileCasStrategy::IterateChunks(std::span<IoHash>												 ChunkHashes,
							   const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
							   WorkerThreadPool*												 OptionalWorkerPool)
{
	std::vector<size_t> FoundChunkIndexes;
	{
		RwLock::SharedLockScope _(m_Lock);
		for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
		{
			if (auto KeyIt = m_Index.find(ChunkHashes[ChunkIndex]); KeyIt != m_Index.end())
			{
				FoundChunkIndexes.push_back(ChunkIndex);
			}
		}
	}
	std::atomic_bool Continue = true;
	if (!FoundChunkIndexes.empty())
	{
		auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) {
			ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]);
			IoBuffer	   Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
			if (Payload)
			{
				if (!AsyncCallback(ChunkIndex, std::move(Payload)))
				{
					return false;
				}
			}
			return true;
		};

		Latch WorkLatch(1);
		for (size_t ChunkIndex : FoundChunkIndexes)
		{
			if (!Continue)
			{
				break;
			}
			if (OptionalWorkerPool)
			{
				WorkLatch.AddCount(1);
				OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, &Continue]() {
					auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
					if (!Continue)
					{
						return;
					}
					try
					{
						if (!ProcessOne(ChunkIndex))
						{
							Continue = false;
						}
					}
					catch (const std::exception& Ex)
					{
						ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
								 m_RootDirectory,
								 ChunkHashes[ChunkIndex],
								 Ex.what());
					}
				});
			}
			else
			{
				Continue = Continue && ProcessOne(ChunkIndex);
			}
		}
		WorkLatch.CountDown();
		WorkLatch.Wait();
	}
	return Continue;
}

void
FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback)
{
	ZEN_TRACE_CPU("FileCas::IterateChunks");

	ZEN_ASSERT(m_IsInitialized);

	RwLock::SharedLockScope _(m_Lock);
	for (const auto& It : m_Index)
	{
		const IoHash&  NameHash = It.first;
		ShardingHelper Name(m_RootDirectory.c_str(), NameHash);
		IoBuffer	   Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
		Callback(NameHash, std::move(Payload));
	}
}

void
FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, uint64_t Size)>&& Callback)
{
	ZEN_TRACE_CPU("FileCas::IterateChunks");

	ZEN_ASSERT(m_IsInitialized);

	RwLock::SharedLockScope _(m_Lock);
	for (const auto& It : m_Index)
	{
		const IoHash& NameHash = It.first;
		Callback(NameHash, It.second.Size);
	}
}

void
FileCasStrategy::Flush()
{
	ZEN_TRACE_CPU("FileCas::Flush");

	m_CasLog.Flush();
	MakeIndexSnapshot();
}

void
FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
{
	ZEN_TRACE_CPU("FileCas::ScrubStorage");

	if (Ctx.IsSkipCas())
	{
		ZEN_INFO("SKIPPED scrubbing: '{}'", m_RootDirectory);
		return;
	}

	Stopwatch Timer;
	ZEN_INFO("scrubbing file CAS @ '{}'", m_RootDirectory);

	ZEN_ASSERT(m_IsInitialized);

	std::vector<IoHash> BadHashes;
	uint64_t			ChunkCount{0}, ChunkBytes{0};

	int DiscoveredFilesNotInIndex = 0;

	{
		std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
		RwLock::ExclusiveLockScope						_(m_Lock);
		for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
		{
			if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second)
			{
				m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed);
				m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size});
				++DiscoveredFilesNotInIndex;
			}
		}
	}

	ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex);

	IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
		if (!Payload)
		{
			BadHashes.push_back(Hash);
			return;
		}
		++ChunkCount;
		ChunkBytes += Payload.GetSize();

		IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload);

		IoHash	 RawHash;
		uint64_t RawSize;
		if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize))
		{
			if (RawHash == Hash)
			{
				// Header hash matches the file name, full validation requires that
				// we check that the decompressed data hash also matches

				CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer));

				OodleCompressor		  Compressor;
				OodleCompressionLevel CompressionLevel;
				uint64_t			  BlockSize;
				if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
				{
					if (BlockSize == 0)
					{
						BlockSize = 256 * 1024;
					}
					else if (BlockSize < (1024 * 1024))
					{
						BlockSize = BlockSize * (1024 * 1024 / BlockSize);
					}

					std::unique_ptr<uint8_t[]> DecompressionBuffer(new uint8_t[BlockSize]);

					IoHashStream Hasher;

					uint64_t RawOffset = 0;
					while (RawSize)
					{
						const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize);

						bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize),
															 RawOffset);

						if (Ok)
						{
							Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize);
						}

						RawSize -= DecompressedBlockSize;
						RawOffset += DecompressedBlockSize;
					}

					const IoHash FinalHash = Hasher.GetHash();

					if (FinalHash == Hash)
					{
						// all good
						return;
					}
				}
			}
		}

		BadHashes.push_back(Hash);
	});

	Ctx.ReportScrubbed(ChunkCount, ChunkBytes);

	if (!BadHashes.empty())
	{
		ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory);

		if (Ctx.RunRecovery())
		{
			ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size());

			for (const IoHash& Hash : BadHashes)
			{
				std::error_code Ec;
				DeleteChunk(Hash, Ec);

				if (Ec)
				{
					ZEN_WARN("failed to delete file for chunk {}: {}", Hash, Ec.message());
				}
			}
		}
		else
		{
			ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size());
		}
	}

	// Let whomever it concerns know about the bad chunks. This could
	// be used to invalidate higher level data structures more efficiently
	// than a full validation pass might be able to do
	Ctx.ReportBadCidChunks(BadHashes);

	ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}",
			 m_RootDirectory,
			 ChunkCount,
			 NiceBytes(ChunkBytes),
			 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}

void
FileCasStrategy::CollectGarbage(GcContext& GcCtx)
{
	ZEN_TRACE_CPU("FileCas::CollectGarbage");

	ZEN_ASSERT(m_IsInitialized);

	if (GcCtx.SkipCid())
	{
		return;
	}

	ZEN_DEBUG("collecting garbage from {}", m_RootDirectory);

	std::vector<IoHash>	  ChunksToDelete;
	std::atomic<uint64_t> ChunksToDeleteBytes{0};
	std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};

	std::vector<IoHash> CandidateCas;
	CandidateCas.resize(1);

	uint64_t DeletedCount = 0;
	uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);

	Stopwatch  TotalTimer;
	const auto _ = MakeGuard([&] {
		ZEN_DEBUG("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}",
				  m_RootDirectory,
				  NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
				  DeletedCount,
				  ChunkCount.load(),
				  NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)),
				  NiceBytes(OldTotalSize));
	});

	{
		ZEN_TRACE_CPU("FileCas::CollectGarbage::Filter");
		IterateChunks([&](const IoHash& Hash, uint64_t Size) {
			bool KeepThis	= false;
			CandidateCas[0] = Hash;
			GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
				ZEN_UNUSED(Hash);
				KeepThis = true;
			});

			if (!KeepThis)
			{
				ChunksToDelete.push_back(Hash);
				ChunksToDeleteBytes.fetch_add(Size);
			}

			++ChunkCount;
			ChunkBytes.fetch_add(Size);
		});
	}

	// TODO, any entires we did not encounter during our IterateChunks should be removed from the index

	if (ChunksToDelete.empty())
	{
		ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory);
		return;
	}

	ZEN_DEBUG("deleting file CAS garbage for '{}': {} out of {} chunks ({})",
			  m_RootDirectory,
			  ChunksToDelete.size(),
			  ChunkCount.load(),
			  NiceBytes(ChunksToDeleteBytes));

	if (GcCtx.IsDeletionMode() == false)
	{
		ZEN_DEBUG("NOTE: not actually deleting anything since deletion is disabled");

		return;
	}

	for (const IoHash& Hash : ChunksToDelete)
	{
		ZEN_TRACE("deleting chunk {}", Hash);

		std::error_code Ec;
		DeleteChunk(Hash, Ec);

		if (Ec)
		{
			ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message());
			continue;
		}
		DeletedCount++;
	}

	GcCtx.AddDeletedCids(ChunksToDelete);
}

GcStorageSize
FileCasStrategy::StorageSize() const
{
	return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)};
}

bool
FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason)
{
	if (Entry.Key == IoHash::Zero)
	{
		OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
		return false;
	}
	if (Entry.Flags & (~FileCasIndexEntry::kTombStone))
	{
		OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
		return false;
	}
	if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
	{
		return true;
	}
	uint64_t Size = Entry.Size;
	if (Size == 0)
	{
		OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
		return false;
	}
	return true;
}

void
FileCasStrategy::MakeIndexSnapshot()
{
	ZEN_TRACE_CPU("FileCas::MakeIndexSnapshot");

	using namespace filecas::impl;

	uint64_t LogCount = m_CasLog.GetLogCount();
	if (m_LogFlushPosition == LogCount)
	{
		return;
	}

	ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory);
	uint64_t   EntryCount = 0;
	Stopwatch  Timer;
	const auto _ = MakeGuard([&] {
		ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
				 m_RootDirectory,
				 EntryCount,
				 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
	});

	namespace fs = std::filesystem;

	fs::path IndexPath	   = GetIndexPath(m_RootDirectory);
	fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory);

	// Move index away, we keep it if something goes wrong
	if (fs::is_regular_file(STmpIndexPath))
	{
		std::error_code Ec;
		if (!fs::remove(STmpIndexPath, Ec) || Ec)
		{
			ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message());
			return;
		}
	}

	try
	{
		if (fs::is_regular_file(IndexPath))
		{
			fs::rename(IndexPath, STmpIndexPath);
		}

		// Write the current state of the location map to a new index state
		std::vector<FileCasIndexEntry> Entries;
		uint64_t					   IndexLogPosition = 0;

		{
			RwLock::SharedLockScope __(m_Lock);
			IndexLogPosition = m_CasLog.GetLogCount();
			Entries.resize(m_Index.size());

			uint64_t EntryIndex = 0;
			for (const auto& Entry : m_Index)
			{
				FileCasIndexEntry& IndexEntry = Entries[EntryIndex++];
				IndexEntry.Key				  = Entry.first;
				IndexEntry.Size				  = Entry.second.Size;
			}
		}

		TemporaryFile	ObjectIndexFile;
		std::error_code Ec;
		ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec);
		if (Ec)
		{
			throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath));
		}
		filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = IndexLogPosition};

		Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header);

		ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0);
		ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader));
		ObjectIndexFile.Flush();
		ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
		if (Ec)
		{
			throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath));
		}
		EntryCount		   = Entries.size();
		m_LogFlushPosition = IndexLogPosition;
	}
	catch (const std::exception& Err)
	{
		ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());

		// Restore any previous snapshot

		if (fs::is_regular_file(STmpIndexPath))
		{
			std::error_code Ec;
			fs::remove(IndexPath, Ec);	// We don't care if this fails, we try to move the old temp file regardless
			fs::rename(STmpIndexPath, IndexPath, Ec);
			if (Ec)
			{
				ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message());
			}
		}
	}
	if (fs::is_regular_file(STmpIndexPath))
	{
		std::error_code Ec;
		if (!fs::remove(STmpIndexPath, Ec) || Ec)
		{
			ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message());
		}
	}
}
uint64_t
FileCasStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
	ZEN_TRACE_CPU("FileCas::ReadIndexFile");

	using namespace filecas::impl;

	std::vector<FileCasIndexEntry> Entries;
	if (std::filesystem::is_regular_file(IndexPath))
	{
		Stopwatch  Timer;
		const auto _ = MakeGuard([&] {
			ZEN_INFO("read store '{}' index containing {} entries in {}",
					 IndexPath,
					 Entries.size(),
					 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
		});

		BasicFile ObjectIndexFile;
		ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
		uint64_t Size = ObjectIndexFile.FileSize();
		if (Size >= sizeof(FileCasIndexHeader))
		{
			uint64_t		   ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry);
			FileCasIndexHeader Header;
			ObjectIndexFile.Read(&Header, sizeof(Header), 0);
			if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) &&
				(Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount))
			{
				Entries.resize(Header.EntryCount);
				ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader));

				std::string InvalidEntryReason;
				for (const FileCasIndexEntry& Entry : Entries)
				{
					if (!ValidateEntry(Entry, InvalidEntryReason))
					{
						ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
						continue;
					}
					m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
				}
				OutVersion = FileCasIndexHeader::CurrentVersion;
				return Header.LogPosition;
			}
			else
			{
				ZEN_WARN("skipping invalid index file '{}'", IndexPath);
			}
		}
		return 0;
	}

	if (std::filesystem::is_directory(m_RootDirectory))
	{
		ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory);
		TCasLogFile<FileCasIndexEntry> CasLog;
		uint64_t					   TotalSize = 0;
		Stopwatch					   TotalTimer;
		const auto					   _ = MakeGuard([&] {
			ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}",
					 m_RootDirectory,
					 NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
					 CasLog.GetLogCount(),
					 NiceBytes(TotalSize));
		});

		std::filesystem::path LogPath = GetLogPath(m_RootDirectory);

		std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
		CasLog.Open(LogPath, CasLogFile::Mode::kTruncate);
		std::string InvalidEntryReason;
		for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
		{
			m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
			TotalSize += Entry.Size;
			CasLog.Append(Entry);
		}

		CasLog.Close();
	}

	return 0;
}

uint64_t
FileCasStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
	ZEN_TRACE_CPU("FileCas::ReadLog");

	using namespace filecas::impl;

	if (std::filesystem::is_regular_file(LogPath))
	{
		uint64_t					   LogEntryCount = 0;
		Stopwatch					   Timer;
		const auto					   _ = MakeGuard([&] {
			ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
		});
		TCasLogFile<FileCasIndexEntry> CasLog;
		CasLog.Open(LogPath, CasLogFile::Mode::kRead);
		if (CasLog.Initialize())
		{
			uint64_t EntryCount = CasLog.GetLogCount();
			if (EntryCount < SkipEntryCount)
			{
				ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
				SkipEntryCount = 0;
			}
			LogEntryCount = EntryCount - SkipEntryCount;
			m_Index.reserve(LogEntryCount);
			uint64_t InvalidEntryCount = 0;
			CasLog.Replay(
				[&](const FileCasIndexEntry& Record) {
					std::string InvalidEntryReason;
					if (Record.Flags & FileCasIndexEntry::kTombStone)
					{
						m_Index.erase(Record.Key);
						return;
					}
					if (!ValidateEntry(Record, InvalidEntryReason))
					{
						ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
						++InvalidEntryCount;
						return;
					}
					m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size});
				},
				SkipEntryCount);
			if (InvalidEntryCount)
			{
				ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath);
			}
			return LogEntryCount;
		}
	}
	return 0;
}

std::vector<FileCasStrategy::FileCasIndexEntry>
FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir)
{
	ZEN_TRACE_CPU("FileCas::ScanFolderForCasFiles");

	using namespace filecas::impl;

	std::vector<FileCasIndexEntry> Entries;
	struct Visitor : public FileSystemTraversal::TreeVisitor
	{
		Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {}
		virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override
		{
			std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory);

			std::filesystem::path::string_type PathString = RelPath.native();

			if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2)))
			{
				if (PathString.at(3) == std::filesystem::path::preferred_separator)
				{
					PathString.erase(3, 1);
				}
				PathString.append(File);

				// TODO: should validate that we're actually dealing with a valid hex string here
#if ZEN_PLATFORM_WINDOWS
				StringBuilder<64> Utf8;
				WideToUtf8(PathString, Utf8);
				IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()});
#else
				IoHash NameHash = IoHash::FromHexString(PathString);
#endif
				Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize});
			}
		}

		virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
									[[maybe_unused]] const path_view&			  DirectoryName) override
		{
			return true;
		}

		const std::filesystem::path&	RootDirectory;
		std::vector<FileCasIndexEntry>& Entries;
	} CasVisitor{RootDir, Entries};

	FileSystemTraversal Traversal;
	Traversal.TraverseFileSystem(RootDir, CasVisitor);
	return Entries;
};

class FileCasStoreCompactor : public GcStoreCompactor
{
public:
	FileCasStoreCompactor(FileCasStrategy& Owner, std::vector<IoHash>&& ReferencesToClean)
	: m_FileCasStrategy(Owner)
	, m_ReferencesToClean(std::move(ReferencesToClean))
	{
		m_ReferencesToClean.shrink_to_fit();
	}

	virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>&) override
	{
		ZEN_TRACE_CPU("FileCas::CompactStore");

		Stopwatch  Timer;
		const auto _ = MakeGuard([&] {
			Reset(m_ReferencesToClean);
			if (!Ctx.Settings.Verbose)
			{
				return;
			}
			ZEN_INFO("GCV2: filecas [COMPACT] '{}': RemovedDisk: {} in {}",
					 m_FileCasStrategy.m_RootDirectory,
					 NiceBytes(Stats.RemovedDisk),
					 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
		});

		ZEN_INFO("GCV2: filecas [COMPACT] '{}': {} {} files",
				 m_FileCasStrategy.m_RootDirectory,
				 Ctx.Settings.IsDeleteMode ? "Removing" : "Checking",
				 m_ReferencesToClean.size());

		size_t Skipped = 0;
		for (const IoHash& ChunkHash : m_ReferencesToClean)
		{
			FileCasStrategy::ShardingHelper Name(m_FileCasStrategy.m_RootDirectory.c_str(), ChunkHash);
			{
				RwLock::SharedLockScope __(m_FileCasStrategy.m_Lock);
				if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end())
				{
					// Not regarded as pruned, leave it be
					continue;
				}
				if (Ctx.IsCancelledFlag.load())
				{
					return;
				}

				if (Ctx.Settings.IsDeleteMode)
				{
					if (Ctx.Settings.Verbose)
					{
						ZEN_INFO("GCV2: filecas [COMPACT] '{}': Deleting CAS payload file '{}'",
								 m_FileCasStrategy.m_RootDirectory,
								 Name.ShardedPath.ToUtf8());
					}
					std::error_code Ec;
					uint64_t		SizeOnDisk = std::filesystem::file_size(Name.ShardedPath.c_str(), Ec);
					if (Ec)
					{
						SizeOnDisk = 0;
					}
					bool Existed = std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
					if (Ec)
					{
						ZEN_WARN("GCV2: filecas [COMPACT] '{}': Failed deleting CAS payload file '{}'. Reason '{}'",
								 m_FileCasStrategy.m_RootDirectory,
								 Name.ShardedPath.ToUtf8(),
								 Ec.message());
						continue;
					}
					if (!Existed)
					{
						continue;
					}
					Stats.RemovedDisk += SizeOnDisk;
				}
				else
				{
					std::error_code Ec;
					bool			Existed = std::filesystem::is_regular_file(Name.ShardedPath.c_str(), Ec);
					if (Ec)
					{
						if (Ctx.Settings.Verbose)
						{
							ZEN_INFO("GCV2: filecas [COMPACT] '{}': Failed checking CAS payload file '{}'. Reason '{}'",
									 m_FileCasStrategy.m_RootDirectory,
									 Name.ShardedPath.ToUtf8(),
									 Ec.message());
						}
						continue;
					}
					if (!Existed)
					{
						continue;
					}
					Skipped++;
				}
			}
		}

		if (Skipped > 0)
		{
			if (Ctx.Settings.Verbose)
			{
				ZEN_INFO("GCV2: filecas [COMPACT] '{}': Skipped deleting of {} eligible files", m_FileCasStrategy.m_RootDirectory, Skipped);
			}
		}

		Reset(m_ReferencesToClean);
	}

	virtual std::string GetGcName(GcCtx& Ctx) override { return m_FileCasStrategy.GetGcName(Ctx); }

private:
	FileCasStrategy&	m_FileCasStrategy;
	std::vector<IoHash> m_ReferencesToClean;
};

class FileCasReferencePruner : public GcReferencePruner
{
public:
	FileCasReferencePruner(FileCasStrategy& Owner, std::vector<IoHash>&& Cids) : m_FileCasStrategy(Owner), m_Cids(std::move(Cids)) {}

	virtual std::string GetGcName(GcCtx& Ctx) override { return m_FileCasStrategy.GetGcName(Ctx); }

	virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx&							Ctx,
													 GcStats&						Stats,
													 const GetUnusedReferencesFunc& GetUnusedReferences) override
	{
		ZEN_TRACE_CPU("FileCas::RemoveUnreferencedData");

		Stopwatch  Timer;
		const auto _ = MakeGuard([&] {
			if (!Ctx.Settings.Verbose)
			{
				return;
			}
			ZEN_INFO("GCV2: filecas [PRUNE] '{}': Count: {}, Unreferenced: {}, FreedMemory: {} in {}",
					 m_FileCasStrategy.m_RootDirectory,
					 Stats.CheckedCount,
					 Stats.FoundCount,
					 NiceBytes(Stats.FreedMemory),
					 NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
		});

		std::vector<IoHash> UnusedCids = GetUnusedReferences(m_Cids);
		Stats.CheckedCount			   = m_Cids.size();
		Stats.FoundCount			   = UnusedCids.size();
		if (UnusedCids.empty())
		{
			// Nothing to collect
			return nullptr;
		}

		if (Ctx.Settings.IsDeleteMode)
		{
			std::vector<FileCasStrategy::FileCasIndexEntry> ExpiredEntries;
			ExpiredEntries.reserve(UnusedCids.size());
			{
				RwLock::ExclusiveLockScope __(m_FileCasStrategy.m_Lock);
				if (Ctx.IsCancelledFlag.load())
				{
					return nullptr;
				}

				uint64_t RemovedSize = 0;
				for (const IoHash& ChunkHash : UnusedCids)
				{
					if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end())
					{
						uint64_t FileSize = It->second.Size;
						ExpiredEntries.push_back(
							{.Key = ChunkHash, .Flags = FileCasStrategy::FileCasIndexEntry::kTombStone, .Size = FileSize});
						RemovedSize += FileSize;
					}
				}
				m_FileCasStrategy.m_TotalSize.fetch_sub(RemovedSize, std::memory_order_relaxed);

				if (!ExpiredEntries.empty())
				{
					for (const FileCasStrategy::FileCasIndexEntry& Entry : ExpiredEntries)
					{
						m_FileCasStrategy.m_Index.erase(Entry.Key);
						Stats.DeletedCount++;
					}
					m_FileCasStrategy.m_CasLog.Append(ExpiredEntries);
					m_FileCasStrategy.m_CasLog.Flush();
				}
			}
		}

		return new FileCasStoreCompactor(m_FileCasStrategy, std::move(UnusedCids));
	}

private:
	FileCasStrategy&	m_FileCasStrategy;
	std::vector<IoHash> m_Cids;
};

std::string
FileCasStrategy::GetGcName(GcCtx&)
{
	return fmt::format("filecas: '{}'", m_RootDirectory.string());
}

GcReferencePruner*
FileCasStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&)
{
	ZEN_TRACE_CPU("FileCas::CreateReferencePruner");

	Stopwatch			Timer;
	const auto			_ = MakeGuard([&] {
		 if (!Ctx.Settings.Verbose)
		 {
			 return;
		 }
		 ZEN_INFO("GCV2: filecas [CREATE PRUNER] '{}' in {}", m_RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
	 });
	std::vector<IoHash> CidsToCheck;
	{
		RwLock::SharedLockScope __(m_Lock);
		if (m_Index.empty())
		{
			return {};
		}
		if (Ctx.IsCancelledFlag.load())
		{
			return nullptr;
		}
		CidsToCheck.reserve(m_Index.size());
		for (const auto& It : m_Index)
		{
			CidsToCheck.push_back(It.first);
		}
	}
	return new FileCasReferencePruner(*this, std::move(CidsToCheck));
}

//////////////////////////////////////////////////////////////////////////

#if ZEN_WITH_TESTS

TEST_CASE("cas.file.move")
{
	// specifying an absolute path here can be helpful when using procmon to dig into things
	ScopedTemporaryDirectory TempDir;  // {"d:\\filecas_testdir"};

	GcManager Gc;

	FileCasStrategy FileCas(Gc);
	FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);

	{
		std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};

		IoBuffer ZeroBytes{1024 * 1024};
		IoHash	 ZeroHash = IoHash::HashBuffer(ZeroBytes);

		BasicFile PayloadFile;
		PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate);
		PayloadFile.Write(ZeroBytes, 0);
		PayloadFile.Close();

		IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path);

		CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash);
		CHECK_EQ(Result.New, true);
	}

#	if 0
	SUBCASE("stresstest")
	{
		std::vector<IoHash> PayloadHashes;

		const int kWorkers = 64;
		const int kItemCount = 128;

		for (int w = 0; w < kWorkers; ++w)
		{
			for (int i = 0; i < kItemCount; ++i)
			{
				IoBuffer Payload{1024};
				*reinterpret_cast<int*>(Payload.MutableData()) = i;
				PayloadHashes.push_back(IoHash::HashBuffer(Payload));

				std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)};
				WriteFile(PayloadPath, Payload);
			}
		}

		std::barrier Sync{kWorkers};

		auto PopulateAll = [&](int w) {
			std::vector<IoBuffer> Buffers;

			for (int i = 0; i < kItemCount; ++i)
			{
				std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)};
				IoBuffer			  Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath);
				Buffers.push_back(Payload);
				Sync.arrive_and_wait();
				CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]);
			}
		};

		std::vector<std::jthread> Threads;

		for (int i = 0; i < kWorkers; ++i)
		{
			Threads.push_back(std::jthread(PopulateAll, i));
		}

		for (std::jthread& Thread : Threads)
		{
			Thread.join();
		}
	}
#	endif
}

TEST_CASE("cas.file.gc")
{
	// specifying an absolute path here can be helpful when using procmon to dig into things
	ScopedTemporaryDirectory TempDir;  // {"d:\\filecas_testdir"};

	GcManager		Gc;
	FileCasStrategy FileCas(Gc);
	FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);

	const int			kIterationCount = 100;
	std::vector<IoHash> Keys{kIterationCount};

	auto InsertChunks = [&] {
		for (int i = 0; i < kIterationCount; ++i)
		{
			CbObjectWriter Cbo;
			Cbo << "id" << i;
			CbObject Obj = Cbo.Save();

			IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
			IoHash	 Hash	   = IoHash::HashBuffer(ObjBuffer);

			FileCas.InsertChunk(ObjBuffer, Hash);

			Keys[i] = Hash;
		}
	};

	// Drop everything

	{
		InsertChunks();

		GcContext Ctx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
		FileCas.CollectGarbage(Ctx);

		for (const IoHash& Key : Keys)
		{
			IoBuffer Chunk = FileCas.FindChunk(Key);

			CHECK(!Chunk);
		}
	}

	// Keep roughly half of the chunks

	{
		InsertChunks();

		GcContext Ctx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));

		for (const IoHash& Key : Keys)
		{
			if (Key.Hash[0] & 1)
			{
				Ctx.AddRetainedCids(std::vector<IoHash>{Key});
			}
		}

		FileCas.CollectGarbage(Ctx);

		for (const IoHash& Key : Keys)
		{
			if (Key.Hash[0] & 1)
			{
				CHECK(FileCas.FindChunk(Key));
			}
			else
			{
				CHECK(!FileCas.FindChunk(Key));
			}
		}
	}
}

#endif

void
filecas_forcelink()
{
}

}  // namespace zen
